package realtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import realtime.app.func.BeanToJsonStrFunction;
import realtime.beans.TradePaymentBean;
import realtime.util.DateFormatUtil;
import realtime.util.DorisUtil;
import realtime.util.EnvUtil;
import realtime.util.KafkaUtil;

public class DwsTradePaymentSucWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = EnvUtil.getSEE(4);
        String topic = "dwd_trade_pay_detail_suc";
        String groupId = "dws_trade_payment_suc_window_group";
        KafkaSource<String> kafkaSource = KafkaUtil.getKafkaConsumer(topic, groupId);
        DataStreamSource<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source");
        //转换数据结构
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaStrDS.map(JSON::parseObject);
        //设置水位线
        SingleOutputStreamOperator<JSONObject> watermarkDS = jsonObjDS.assignTimestampsAndWatermarks(
                WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                            @Override
                            public long extractTimestamp(JSONObject jsonObject, long l) {
                                return jsonObject.getLong("ts") * 1000;
                            }
                        })
        );
        //按照用户id分组
        KeyedStream<JSONObject, String> keyedDS = watermarkDS.keyBy(v1 -> v1.getString("user_id"));
        //对流中数据进行处理并转换为实体类
        SingleOutputStreamOperator<TradePaymentBean> processDS = keyedDS.process(
                new KeyedProcessFunction<String, JSONObject, TradePaymentBean>() {
                    private ValueState<String> lastPaySucDateState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("lastPaySucState", String.class);
                        valueStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(1)).build());
                        lastPaySucDateState = getRuntimeContext().getState(valueStateDescriptor);
                    }

                    @Override
                    public void processElement(JSONObject jsonObj, KeyedProcessFunction<String, JSONObject, TradePaymentBean>.Context ctx, Collector<TradePaymentBean> out) throws Exception {
                        String lastPaySucDate = lastPaySucDateState.value();
                        Long ts = jsonObj.getLong("ts") * 1000;
                        String curDate = DateFormatUtil.toDate(ts);
                        Long paymentSucUniqueUserCount = 0L;
                        if (StringUtils.isEmpty(lastPaySucDate) || !lastPaySucDate.equals(curDate)) {
                            paymentSucUniqueUserCount = 1L;
                            lastPaySucDateState.update(curDate);
                        }
                        out.collect(new TradePaymentBean(
                                "",
                                "",
                                "",
                                paymentSucUniqueUserCount
                        ));
                    }
                }
        );
        //开窗
        AllWindowedStream<TradePaymentBean, TimeWindow> windowDS = processDS.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)));
        //聚合
        SingleOutputStreamOperator<TradePaymentBean> reduceDS = windowDS.reduce(
                new ReduceFunction<TradePaymentBean>() {
                    @Override
                    public TradePaymentBean reduce(TradePaymentBean v1, TradePaymentBean v2) throws Exception {
                        v1.setPaymentSucUniqueUserCount(v1.getPaymentSucUniqueUserCount() + v2.getPaymentSucUniqueUserCount());
                        return v1;
                    }
                },
                new AllWindowFunction<TradePaymentBean, TradePaymentBean, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<TradePaymentBean> values, Collector<TradePaymentBean> out) throws Exception {
                        String stt = DateFormatUtil.toYmdHms(window.getStart());
                        String edt = DateFormatUtil.toYmdHms(window.getEnd());
                        String curDate = DateFormatUtil.toDate(window.getStart());
                        for (TradePaymentBean value : values) {
                            value.setStt(stt);
                            value.setEdt(edt);
                            value.setCurDate(curDate);
                            out.collect(value);
                        }
                    }
                }
        );
        reduceDS.print(">>>>");
        reduceDS.map(new BeanToJsonStrFunction<>())
                .sinkTo(DorisUtil.getDorisSink("dws_trade_payment_suc_window"));
        env.execute();
    }
}
